Open data (part one): Event Discharge Monitor data from Thames Water

code
analysis
EDM
opendata
Author

Leo Kiernan

Published

January 30, 2023

(updated 24th March 2023 to include an extra expo of the historic status endpoint see: Section 6 )

Overview

This blog explores open-data. Specifically, a recently published open data-set from Thames Water on Event Discharge Monitors (EDM).

Spoiler Alert: This blog is only here to highlight the existence of the new open-data service and as a ‘hello world’ example for getting started using R. I set myself the goal of emulating the map of EDM data published by Thames Water but using the open data API and open-source tools. If you just want to see the final map, then jump to Section 5.4. If you want to see how I did it, read on…

Background

The UK water industry has recently embarked on an open-data initiative. Thames Water has created a portal Thames Water Open Data Portal through which it can publish it’s open-data using RESTful APIs. The first such API publishes data on Event Discharge Monitors (EDMs). These sensors are placed at the outlet of sewage treatment works and register when untreated sewage is being discharged into the environment. For more information, see the page on river health that includes detail on the background of EDMs and the data-set that Thames Water has made available. Because of it’s impact on nature and potentially humans, this data has been available on request for some time. Thames Watetr has taken steps to allow this data to be more easily accessed in two ways:

  • An interactive map visualizing storm discharge (EDM) data

  • An open API (the subject of this blog) from which the public can collect the underlying data as open-data. The terms of service are available on the portal. There are two API end-points:

    • Discharge to Environment - Current Status. This endpoint provides near-realtime (live) status of all EDMs, one record per monitor regardless of whether they are on-line, offline, discharging or otherwise

    • Discharge to Environment - Alert Stream. This endpoint allows queries of historic discharges. It provides state-changes for EDMS (e.g. stopping discharging on a certain date etc.) so there can be many few or no records for each monitor. The endpoint can be filtered and sliced and diced depending on what the consumer might be interested in. For example, users may filter data to a certain location or permit number, or watercourse name. the time-period of interest and geographic area can also be trimmed as required.

  • I will be exploring both the current status & alert stream API endpoints. The former is the sort of data you would use to generate a “live” map of discharges. the latter is more useful for analysis of trends over time or reporting on number of discharges per annum etc.

Preparation: Registering with the API

The API is open and free, however it does require two preparatory steps:

  1. Registration:

    Registration allows Thames Water to communicate with consumers of their open-data about the service. For instance, in the last week (since I’ve been exploring this dataset), I have received notification that the service was under maintenance and may appear stale for a short while, and another when the normal service was resumed. You can register for an API here, then get credentials from here.

  2. Creation of an API key

    The API key(s) are used to authenticate against the service, they are akin to login id(s).

API endpoint 1: The “live” (current) status of EDMs

Step 0: Load some libraries

First there’s the obligatory step of loading the libraries I’ll be using throughout this post:

Code
library(tidyverse)  # this blog uses the tidyverse
library(lubridate)  # I'm sure lubridate has been added to the tidyverse, old habits die hard
library(httr)       # we will use this to collect data from the internet
library(sf)         # I'm going to plot some maps. sf helps with any spatial stuff
library(leaflet)    # I like the interactivity of leaflet, but there are many other packages to plot maps.
source(here::here("../shiny/openRivers/functions.R"))

Step 1: Collect the data from the API

To help new users get started, Thames Water has also provided code snippets in a variety of languages on how to connect to the API and collect data. I’ve use the #rstats snippet to collect some data (JSON) and convert it into a data-frame as shown below:

Code
# start by gathering sensor data from Thames Water for the Thames

#| you can create and retrieve API tokes from here: https://data.thameswater.co.uk/s/application-listing
#| I like to use keyring:: to keep secrets away from version control
#| So in an unpublished bit of code I have already pushed the secrets as follows:
#| keyring::key_set_with_value(service = "thames_api", username = "CLIENT_ID", password = "<MY ID>")
#| keyring::key_set_with_value(service = "thames_api", username = "CLIENT_SECRET", password = "<MY PW>")

CLIENT_ID <- keyring::key_get(service = "thames_api", "CLIENT_ID")
CLIENT_SECRET <- keyring::key_get(service = "thames_api", "CLIENT_SECRET")
#| modify this URL as desired to access the different end points.
URL <- "https://prod-tw-opendata-app.uk-e1.cloudhub.io/data/STE/v1/DischargeCurrentStatus"
#| add here any query parameters if using them e.g. date filters, leave as '' for none.
PARAMS <- ""

#| send the request
res <- httr::GET(url = URL, 
                 httr::add_headers(client_id = CLIENT_ID, client_secret = CLIENT_SECRET),
                 query = PARAMS)

#| check status and return only valid requests
if (httr::status_code(res) != 200){
  warning(paste("Request failed with status code: ", httr::status_code(res)))
} else {
  #| parse the response
  content <- httr::content(res)
  
  #| extract the edm_raw
  
  #| and return
  edm_raw <- dplyr::bind_rows(content$items)
}
#| it's worth remembering when we collected the data as it could change each time we collect it:
data_collection_dt <- Sys.time()

Step 2: Create the data you want to see in the world

The data we have received from the API only has three categories:

Code
edm_raw %>%
  count(AlertStatus, sort = T)
# A tibble: 3 × 2
  AlertStatus         n
  <chr>           <int>
1 Not discharging   395
2 Offline            35
3 Discharging        33

However, the interactive map categorizes EDMs into:

  • Off-line

  • Discharging

  • and subdivides those ‘Not discharging’ into

    • ones that have discharged in the past 48 hours

    • those that have not discharged in the past 48 hours

If I’m to reproduce the interactive map, I will need to create a sub-category of “Not discharging” that replicates the extra categories. I could overwrite the AlertStatus field, but for clarity, I’ll create a new field called ‘display_status’ to hold the refined categories. While we’re processing the raw data we will also tidy up a few things as annotated in the code below:

Code
edm_df <- edm_raw %>%
  # make the column names standard
  janitor::clean_names() %>% 
  # geo-reference x & y (they are in eastings & northings so the crs starts as 27700)
  st_as_sf(coords = c("x", "y"), crs = 27700) %>%
  # my naming convention: geometries are called 'geom'
  rename(geom = geometry) %>%
  # because I'm using leaflet,  I convert from E&N to lat-long (crs -> 4326)
  sf::st_transform(crs = sf::st_crs(4326)) %>%
  # convert datetimes from chr to a more useful & natural type
  mutate(status_change = lubridate::as_datetime(status_change)) %>%
  mutate(most_recent_discharge_alert_start = lubridate::as_datetime(most_recent_discharge_alert_start)) %>%
  mutate(most_recent_discharge_alert_stop = lubridate::as_datetime(most_recent_discharge_alert_stop)) %>%
  # now augment the data-set with the useful new stuff...
  mutate(time_since_last_discharge = difftime(data_collection_dt, most_recent_discharge_alert_stop, units = "days")) %>%
  mutate(discharge_duration = difftime(coalesce(most_recent_discharge_alert_stop, data_collection_dt),  most_recent_discharge_alert_start, units = "days")) %>%
  mutate(display_status = case_when(
    (alert_status == 'Not discharging') & (time_since_last_discharge <= days(2)) ~ 'Discharge recorded in the last 48 hours',
    TRUE ~ alert_status
    )
  ) %>%
  # finally, it's a good habit to have a unique ID,  so I tend to add one (location_name looks to be, but...)
  mutate(edm_id = row_number(), .before = 1)

Step 3: Emulation of the online EDM map

Here’s the code I’ve used to generate my version of Thames Water’s EDM Map (why not open both side-by-side and compare). There’s plenty of googling and stack-exchange behind this code.

Code
# this helper function will be useful when I define the content of labels that are diplayed
# when the mouse hovers on a monitor in the leaflet map 
asHTML <- function(text){
  attr(text, "html") <- TRUE
  class(text) <- c("html", "character")
  return(text)
}

# I'm going to have one layer for each distinct "state" of EDMs
# off
# dis(charging)
# not(discharging)
# recent(ly) discharging
#
offIcon <- awesomeIcons(
  icon = 'ios-close',
  iconColor = 'black',
  library = 'ion',
  markerColor = "lightgray"
)

disIcon <- awesomeIcons(
  icon = 'ios-close',
  iconColor = 'black',
  library = 'ion',
  markerColor = "red"
)

recentIcon <- awesomeIcons(
  icon = 'ios-close',
  iconColor = 'black',
  library = 'ion',
  markerColor = "orange"
)

notIcon <- awesomeIcons(
  icon = 'ios-close',
  iconColor = 'black',
  library = 'ion',
  markerColor = "darkgreen"
)

# to add a title to a leaflet map: https://stackoverflow.com/questions/49072510/r-add-title-to-leaflet-map
# I'm not sure hwy,  but this doesnt seem to render properly when this is published
# but it works in perview so I'm leaving the log inplace
 library(htmltools)
tag.map.title <- tags$style(HTML("
  .leaflet-control.map-title { 
    transform: translate(-50%,20%);
    position: fixed !important;
    left: 50%;
    text-align: center;
    padding-left: 10px; 
    padding-right: 10px; 
    background: rgba(255,255,255,0.75);
    font-weight: bold;
    color: black;
    font-size: 14px;
  }
"))

title <- tags$div(
  tag.map.title, HTML(paste0("Emulation of Thames Water EDM map.<br>Data collected: ", data_collection_dt))
)  

# I like to create a base-map then add layers (you don't have to break the steps)
base_map <- leaflet() %>%
  #| Base groups
  addTiles(group = "OSM (default)") %>%
  addProviderTiles(providers$Stamen.Terrain, group = "Terrain") %>%
  addProviderTiles(providers$Stamen.TonerLite, group = "Toner Lite") %>%
  addControl(title, position = "topleft", className="map-title")

# now add layers to the basemap,  one for each state for the monitors
base_map %>%
  addAwesomeMarkers(data = edm_df %>%
                      filter(display_status == "Offline"),
  #|                  clusterOptions = markerClusterOptions(),
                    label = ~asHTML(paste0('Name: ', location_name, '<br>',
                                            'Watercourse: ', receiving_water_course, '<br>',
                                            'DateTime: ', data_collection_dt, '<br>',
                                            'Staus: ', alert_status)),
                    icon = offIcon,
                    clusterId = "Offline EDM",
                    group = "Offline EDM"
  ) %>%
  addAwesomeMarkers(data = edm_df %>%
                      filter(display_status == "Discharging"),
 #|                   clusterOptions = markerClusterOptions(),
                    label = ~asHTML(paste0('Name: ', location_name, '<br>',
                                            'Watercourse: ', receiving_water_course, '<br>',
                                            'DateTime: ', data_collection_dt, '<br>',
                                            'Staus: ', alert_status)),
                    icon = disIcon,
                    clusterId = "Discharging EDM",
                    group = "Discharging EDM"
  ) %>%
    addAwesomeMarkers(data = edm_df %>%
                        filter(display_status == "Not discharging"),
                      #|                    clusterOptions = markerClusterOptions(),
                      label = ~asHTML(paste0('Name: ', location_name, '<br>',
                                            'Watercourse: ', receiving_water_course, '<br>',
                                            'DateTime: ', data_collection_dt, '<br>',
                                            'Staus: ', alert_status)),
                      icon = notIcon,
                      clusterId = "Not discharging EDM",
                      group = "Not discharging EDM"
    ) %>%
    addAwesomeMarkers(data = edm_df %>%
                        filter(display_status == "Discharge recorded in the last 48 hours"),
                      #|                    clusterOptions = markerClusterOptions(),
                      label = ~asHTML(paste0('Name: ', location_name, '<br>',
                                            'Watercourse: ', receiving_water_course, '<br>',
                                            'DateTime: ', data_collection_dt, '<br>',
                                            'Staus: ', alert_status)),
                      icon = recentIcon,
                      clusterId = "Recent Discharge EDM",
                      group = "Recent Discharge EDM"
    ) %>%
    addLayersControl(
    baseGroups = c("OSM (default)", "Terrain", "Toner Lite"),
    overlayGroups = c("Discharging EDM", "Not discharging EDM", "Recent Discharge EDM", "Offline EDM"),
    options = layersControlOptions(collapsed = FALSE))

Note: The above map displays data from whenever this blog was last refreshed, so the EDM states will vary from those on the live interactive map. It’s very easy to create apps that pull data at some interval or when a button is pressed. In R I would use use shiny, but there are equivalent frameworks in Python, Javascript etc.

API endpoint 2: Historic status of EDMs

As described in the background section, the historic data API1 accepts query parameters to allow what is requested and returned to be tuned. It is also limited to return a maximum of 1000 records per call, but it can be called multiple times to collect more data if the result-set is bigger than 1000 records long. To make my coding-life easier, I’ve written a function (called load_edm_historic_data()) that will help me collect data from the historic end-point. When I call it it will gather all available data on the EDMs. The code (shown below) requests EDM data back to 31st March 2019, however it appears that the Thames Water API only publishes data from 1st April 2022. The API is very responsive, it only takes a couple of seconds to collect all the available data. if this request was to be made regularly, then previous results could be cached and only new data could be requested by clipping the request to return only updates (e.g. to after YYYY-MM-DD as follows: GET {api_root}/data/STE/v1/DischargeAlerts?col_1=DateTime&operand_1=gt&value_1=YYYY-MM-DD)

Code
  # get everything by walking back in time without pre-defining an EDM list
  edm_historic_df <- load_edm_historic_data(client_id = CLIENT_ID,
                                            client_secret = CLIENT_SECRET,
                                            after_dt = as.Date("2019-03-31"),
                                            one_shot = FALSE)

The table below shows the content of the (first 20 rows of) data returned:

Code
edm_historic_df %>%
  head(20) %>%
  DT::datatable(filter = 'top') %>%
  # https://stackoverflow.com/questions/31921238/shrink-dtdatatableoutput-size
  div(style = "font-size: 75%")

My first reaction when viewing this dataset was a mix of surprise (that it was such a different shape to the “live” version), followed by an appreciation of why the data was a different shape, and then a slow sigh and realisation that I would have to do considerable homework and reshaping of this data if I was to to turn in from a sequence of state-changes, into something from which I could infer things like the duration of discharges or even the number of discharges per EDM.

Comparing data returned from the “live” and historic endpoints

Live & historic datasets are quite different in how they are structured:

  • Historic dataset is event driven. Each record represents a change in state of a discharge monitor (e.g. from discharging to not discharging, or from offline to on-line.

  • ‘Live’ data is just a summary of the current state, with auxiliary information on what preceded the current state (e.g. the time since the state changed).

Note: The structure of the historic data is (almost) enough to reconstruct the “live” data at any point in time since the start of the historic record-set. I say almost because the historic dataset does not contain any information on initial conditions (the existence or state of discharge monitors ar the very start of the historic dataset).

My simplistic way to help understand the live & historic datasets is to use an analogy of “fences Vs fenceposts”.

Thinking about the status of any given EDM over time: The historic data is like a list of fenceposts (at this point in time a discharge monitor changed to this state), whereas the live data is more like some point along the fence. As I write this I want to draw sketches, please forgive the low quality, it’s only to get across the analogy.

(Re)read the documentation!

Throughout this exploration I regularly reminded myself to read the online documentation, do something, then re-read the documentation! It’s an eternal source of amasement to me how the process of working in a field builds a cognitive map that provides “hooks” upon which to hang fine points. Certain detail can be skipped or overlooked on first reading which gradually become more apparent and valuable on subsequent revisits. While the technical documentation on how to collect the data is good, there is little or no documentation on how to interpret the fields in the resulting dataframe. I have done some investigations into the structure of the historic dataframe by comparing and contrasting with the “live”results. I have inferred a few things, and yes, I know, that’s a dangerous and often painful road to go down with someone else’s data. From what I can see, the main differences in the live & historic results are:

  • The live data reports the status at any given time, whereas the historic status must be inferred by processing the start/stop date-times provided for each monitor.

  • The live datastream reports in a simple to consume way, whether a station is discharging, not discharging or offline. However there are cases in the historic dataset where the station can be simultaneously offline and reporting an event. This is because there are two kinds of way to record starts & ends of events…. One pair (simply labelled “Start” and “Stop”) bracket events, while the other pair (labelled “Offline start” and “Offline stop”) appear to bracket the validity of readings (ie.e. the periods when the station was offline, rather than the start and end of invalid discharge events during periods that the station was offline).

  • There are some EDMs present in the “live” stream that are not present in the historic stream. This is perfectly feasible, as the historic stream only reports starts & stops, so for example, sites that have never changed state during this period will not be present in the results. While perfectly feasible, it is not helpful, as it begs a question of the “initial state” of any EDM. Because the records only go back to 2022-04-01, any EDM that has not changed state (been permanently offline or has never discharged (or presumably, has never stopped discharging) since that date does not appear at all in the historic dataset.

Creating the data you want to see in the world

In the previous section I showed some examples of the (event based) historic EDM status dataset. I’ll consider three use cases that I might want to explore using this data:

  1. As a performance manager I want to know which sites have the most discharge events in a given period so that I can investigate and check whether the monitors are “chattering” (reporting too many on & off state changes).

  2. As an environmentalist I want to know the total duration of discharges for each site so that I can highlight those that have been discharging for the longest time.

  3. As an open water swimmer I want to know the state of the sewage discharge monitors on a given day. Not just today, but on some day in the recent past when I went swimming, so that I can check whether there were any discharges running upstream of where I swam, at the point in time I was in the water.

  4. As a modeller, I would like to know the total number of sites discharging every day so that I can explore how discharges link to environmental factors such as rainfall

Use case 1: Which sites have the most discharge events in a given period?

This usecase could be answered using the historic data “as-is” by simply counting the number of starts & stops in any given period. The example below shows the top ten across the whole period, but the data could be sliced by user-defined period and diced into months or whatever as required

Code
chatter_limit <- 10

most_frequently_discharging_edms <- edm_historic_df %>%
  count(location_name, alert_type, sort = T) %>%
  pivot_wider(names_from = alert_type, values_from = n) %>%
  rowwise() %>% 
  mutate(total_events = sum(c_across(where(is.numeric)), na.rm = T)) %>%
  ungroup() %>% 
  arrange(desc(total_events)) %>%
  mutate(rank = row_number(), .before = 1) %>%
  head (chatter_limit)

most_frequently_discharging_edms %>%
  DT::datatable(filter = 'top')

Use case 2: Reporting the total duration of discharges by site

This use case is less straight-forward than use case 1. This is because to answer this question we need to take those historic event “fenceposts” and build them into a sequence over time, then measure the time between them. The task of joining start-stop events is slightly more complex that perhaps it initially appears as we want to pair each start with it’s “nearest subsequent stop”, and repeat the process for offline starts & stops. Fortunately, opensource data manipulation packages come to the rescue. The code below reformats the historic data and the summarises the result.

Code
#| first I write a function
#| (as I'm going to be doing the same pairing TWICE)
#| one for the start - stop events for each EDM
#| and again for the offline start / stop events
pair_edm_events <- function(this_df, start = "start", stop = "stop") {
  starts_df <- this_df %>%
    filter(alert_type == start) %>%
    select(location_name, alert_type, date_time)
  
  stops_df <- this_df %>%
    filter(alert_type == stop) %>%
    select(location_name, alert_type, date_time)
  
  paired_df <- starts_df %>%
    left_join(stops_df, by = join_by(location_name, closest(date_time <= date_time)), suffix = c(".start", ".stop")) %>%
    mutate(event_hrs = difftime(date_time.stop, date_time.start,units="hours") )
  
  return(paired_df)
}
edm_ss_events <- pair_edm_events(edm_historic_df, start = "Start", stop = "Stop")
edm_offline_events <- pair_edm_events(edm_historic_df, start = "Offline start", stop = "Offline stop")

top_ten_edms_by_discharge_duration <- edm_ss_events %>%
  group_by(location_name) %>%
  summarise(n = n(), total_hrs = sum(event_hrs), max_hrs = max(event_hrs)) %>%
  arrange(desc(total_hrs)) %>%
  head(10)

top_ten_edms_by_discharge_duration
# A tibble: 10 × 4
   location_name                         n total_hrs     max_hrs      
   <chr>                             <int> <drtn>        <drtn>       
 1 Dartford Creek Storm                  1 5984.75 hours 5984.75 hours
 2 Weybridge                            42  885.25 hours  141.50 hours
 3 Selborne                             14  855.25 hours  445.00 hours
 4 Hatfield Heath                        8  828.00 hours  379.25 hours
 5 Cirencester                          61  824.75 hours  309.75 hours
 6 Charlton Storm Relief                 3  779.00 hours  765.25 hours
 7 Moreton-In-Marsh (Primrose Court)    58  776.00 hours  212.25 hours
 8 Holmwood                             36  766.75 hours  173.50 hours
 9 Crondall                             17  763.50 hours  458.50 hours
10 Weston-On-The-Green STW              15  752.00 hours  186.50 hours

Now that we’ve reshaped the data to pair start & stop events we can plot them over time in a way that I hope will illustrate my fence-and-fencepost analogy a little more clearly:

Code
edm_ss_events %>%
  # I could filter by the top 10 here, but I want to rank them according to length too
  inner_join(top_ten_edms_by_discharge_duration) %>%
  arrange(desc(total_hrs)) %>%
  mutate(location_name = as_factor(location_name)) %>%
  mutate(location_name = fct_reorder(location_name, total_hrs)) %>%
  ggplot() +
  coord_flip()+
  geom_segment( aes(x=location_name,
                    xend=location_name,
                    y=date_time.start - lubridate::days(1),
                    yend=date_time.stop + lubridate::days(1)),
                size = 8,
                lineend = "butt",
                color="black") +
  geom_segment( aes(x=location_name,
                    xend=location_name,
                    y=date_time.start,
                    yend=date_time.stop,
                    color=location_name),
                size = 7) +
  # geom_point( aes(x=location_name, y=date_time.start), shape = 20, color="green", size=7 ) +
  # geom_point( aes(x=location_name, y=date_time.stop), shape = 15, color="red", size=3 ) +
  theme(
    legend.position = "none",
  ) +
  labs(title = "Top 10 longest discharging EMDs since 2022-04-01",
       subtitle = "Sites have been ranked in descending total discharge duration", caption = "data from Thames Water {api_root}/data/STE/v1/DischargeAlerts", tag = "", x = "EDM location", y = "date")

The previous plot could have been adapted to show the duration in hours, and / or the count of individual events, but I hope the point is made that use case 2 required some manipulation of the raw historic dataset.

Note that the plot shows that most of the discharges were in January and March 2023. This is linked to periods of high rainfall (not shown here). Most of 2022 was extremely dry and hence the load on the sewage works (due to combined fowl land storm sewerage systems) was low and hence the likelihood of needing to discharge because of storm loadings was minimal. however in 2023 the south of England saw considerable rainfall. This in turn loaded the storm sewerage system. Because many storm and foul systems are combined in the UK, the storm flows joined the foul sewage system and inundated sewage treatment works, some of which overflowed and discharged surfeit into the environment.

Use case 3: What was the state of the EDM as an arbitrary point some time ago?

I’ve published a shiny app (here) that allows users to search from arbitrary points on the river network and discover any EDMs that were discharging upstream (answering the question “has any sewage been discharged recently into the river upstream of where I’d like to swim?”

In this app, the user can select a range of dates to explore the state of the river network. The app uses cached cuts of the “live” API results to inform the status of EDMs. In this section I’ll explore how the historic dataset can be queried and reshaped further to extract the state of EDMs at arbitrary points in time (within the bounds of the historic dataset).

First I will write a function that will extract the state of the EDMs given a target datetime. Then I will compare this against a cached copy of data extracted from the “live” endpoint at that target datetime.

Code
#| this function is more complicated than I initialy imagined.
#| the complexities arise because some EDMs can have a number of coincident states
#| e.g. they can be marked as offline *and* dischargin
#| having compared the live data with histtoric data, and the outputs of this function
#| I any beginning to assume that sites can be offline (unreliable) but still chatter away
#| # The function does the following:
#| 1) creates 'current_discharges' ... a list of EDMs that were discharing at 'historic_dt'
#|    by filtering any start-stop events that bracket the 'historic_dt' (start <= historic_dt <= stop)
#|    NOTE tis list include offline and normal start/stops
#| 2) create a 'recent_discharges'  dataframe by checking for avtive events up to 2 days previous
#| 3) creates a 'not discharging' list of EDMs wher historic_dt is not bracketed to start-stop 
#| 4) these three lists are them combined and any conflicing (duplicate) states are resolved
#|    by (conveniently) ranking the status in reverse order by EMD alpabetically such that
#|    (O)ffline trumps 
#|    (D)ischarging trumps
#|    (N)ot discharging.
get_edm_status_at <- function(this_df, historic_dt) {
  if(historic_dt > max(this_df$date_time.stop, na.rm = T)) {
    warning(str_c(historic_dt, " is out or range of data: ", range(this_df$date_time)))
  }
  if(historic_dt < min(this_df$date_time.start, na.rm = T)) {
    warning(str_c(historic_dt, " is out or range of data: ", range(this_df$date_time)))
  }
  # this code is here to help debugging
  if(F) {
    this_df <- this_df %>% filter(location_name == "Petersham Road")
  }
  
  current_discharges <- this_df %>%
    filter(historic_dt >= coalesce(date_time.start, lubridate::as_datetime("2000-01-01 00:00:00")) ) %>%
    filter(historic_dt <= coalesce(date_time.stop, lubridate::as_datetime("3000-01-01 00:00:00"))  ) %>%
    mutate(date_time = historic_dt) %>%
    transmute(location_name, date_time, alert_status = alert_type.start) %>%
    mutate(alert_status = ifelse(alert_status %in% c("Start", "Stop"), "Discharging", alert_status)) %>%
    mutate(alert_status = ifelse(alert_status %in% c("Offline start", "Offline stop"), "Offline", alert_status)) %>%
    mutate(alert_status = as.character(alert_status)) %>%
    left_join(
      this_df %>%
        mutate(previous_alert = case_when(
          str_detect(alert_type.start, "Offline") ~ "Offline",
          alert_type.start == "Start" ~ "Discharging",
          TRUE ~ "Unknown")) %>% 
        transmute(location_name,
                  previous_alert,
                  date_time = date_time.stop),
      by = join_by(location_name, closest(date_time >= date_time)), suffix = c("", ".previous")
    ) %>%
    mutate(time_since_last_event = difftime(date_time, date_time.previous, units = "days")) %>%
    mutate(display_status = alert_status)
  
  # get the EDMs that recently discharged
  recent_discharges <- this_df %>%
    mutate(date_time = historic_dt) %>%
    filter(!str_detect(alert_type.start, "Offline")) %>%
    mutate(time_since_last_stop = difftime(historic_dt, date_time.stop, units = "days")) %>%
    # the ones that were doing discharging
    # but stopped at any point in the last two days
    filter(time_since_last_stop > days(0)) %>% #  not in the future
    filter(time_since_last_stop <= days(2)) %>%
    # get the most recent (if there have been m> 1 in the past 2 days)
    group_by(location_name) %>%
    arrange(time_since_last_stop) %>%
    slice(1) %>%
    ungroup() %>%
    # drop anything that's in the "currently" altering ot offline list
    anti_join(current_discharges %>% select(location_name)) %>%
    mutate(alert_status = "Discharge recorded in the last 48 hours") %>%
    # make the data you want to see in the world
    transmute(location_name, date_time, alert_status, display_status = alert_status, status_changed = date_time.stop)
  
  # get the EDMs that are not discharging:
  not_discharging <- this_df %>%
    mutate(date_time = historic_dt) %>%
    # drop anything that's in the "currently" discharging, or offline list
    filter(!(location_name %in% current_discharges$location_name)) %>%
    filter(!(location_name %in% recent_discharges$location_name)) %>%
    mutate(alert_status = "Not discharging") %>%
    # make the data you want to see in the world
    transmute(location_name, date_time, alert_status, display_status = alert_status, status_changed = date_time.stop) %>%
    filter(historic_dt >= coalesce(status_changed, lubridate::as_datetime("2000-01-01 00:00:00"))  ) %>%
    # get the most recent (if there have been m> 1 in the past 2 days)
    group_by(location_name) %>%
    arrange(desc(status_changed)) %>%
    slice(1) %>%
    ungroup()

  
  # combine the results of stuff discharging "now" and stuff discharging recently
  # note: we dont want ambiguous status,  so if it's discharging and offline,  then it's offline
  # note: ambiguity can creep in just on the result df,  it's not (solely) because we're joining recent_discharges
  result <- current_discharges %>%
    transmute(location_name, date_time, alert_status, display_status, status_changed = date_time.previous) %>%
    bind_rows(recent_discharges) %>%
    bind_rows(not_discharging) %>%
    group_by(location_name) %>%
    arrange(desc(alert_status)) %>%
    slice(1)
  
  # finally: note,  this wont return anything that isn't dicharging now,   
  return(result)
}

get_download_date <- function(this_edm_df) {
  target_dt <- this_edm_df  %>%
  as_tibble() %>%
  select(location_name, most_recent_discharge_alert_stop, time_since_last_discharge) %>%
  mutate(date_collected = most_recent_discharge_alert_stop + time_since_last_discharge) %>%
  summarise(date_collected = min(date_collected, na.rm =  T))
  target_dt <- target_dt[1,]$date_collected
  return(target_dt - minutes(60))
}

Now let’s use the function to extract the status of EDMs on the 2023-03-09 at 10:52:24

Code
# test that the reconstructor works
edm__live_df <- readRDS("C:/Users/leoki/CODE/R-Home/shiny/openRivers/cacheOpenRivers/edm_df_2023-03-09.Rds")

target_dt <- get_download_date(edm__live_df)

# get results from historic
edm_status_at <- get_edm_status_at(this_df = edm_ss_events %>%
                                     bind_rows(edm_offline_events),
                                   historic_dt = lubridate::as_datetime(target_dt))

# THIS SHOULD RETURN NOTHING
edm__live_df %>%
  as_tibble() %>%
  filter(location_name %in% edm_status_at$location_name) %>%
  transmute(location_name, live_status = display_status) %>%
  left_join(
    edm_status_at %>%
      transmute(location_name, historic_status = display_status) 
  ) %>%
  filter(live_status != historic_status) %>%
  DT::datatable(filter = 'top')

The status of all but seven of the EDMs (7 in more than 400 EDMs is a match of about 98.5%) have matched by reconstructing the state of the EDMs at an arbitrary point in time at which the “live” data had been collected and cached. There’s still a couple of questions open to me at this point about what exactly is driving the seven differences but I’ll stop here. I’m not building any production solutions with this dataset and so I’m happy that my point about the value of reshaping historic data has been made. I’ll leave it as an exercise for the reader to dig further and debug. If I was to pursue this, my thoughts would be to explore:

  • whether the historic data and live data are compatible

  • whether I’ve chosen exactly the right date-time to rebuild the “live” data from the “historic” data

  • whether my interpretation of the historic data is correct (which might require access to subject matter expertise)

  • or whether I’ve simply left a bug somewhere in the code.

Use case 4: What is the total number of sites discharging each and every day

So far we’ve counted the number of events, and paired start/stop to allow discharge durations, and even back-calculated the state of every EDM at some point in time. It’s only a small step now to count the number of EDMs discharging each day.

Code
#| The following code creates a series of dates
event_by_day <- seq.Date(from = as.Date(min(edm_historic_df$date_time)),
                        to = as.Date(max(edm_historic_df$date_time)),
                        by = "day") %>%
  enframe(name = NULL, value = "dt") %>%
  select(dt) %>%
  # then left joins the events (left join means we'll keep every date 'dt')
  left_join(
    # we're ibnterestsed in both start/stop AND offline-start/offline-stop events
    edm_ss_events %>%
      bind_rows(edm_offline_events) %>%
      # slightly hacky,  but good enough for now...
      # near the begining there may be stopes without starts
      # near the end there may be starts that havent stopped yet
      # these cases have NA representing the missing datetimes
      # I'm forcing NA starts to be 2000-01-01 (before the first possible start)
      # and the NA ends to be after the last possible end of event_by_day
      mutate(date_time.start = coalesce(date_time.start, lubridate::as_datetime("2000-01-01 00:00:00")) ) %>%
      mutate(date_time.stop = coalesce(date_time.stop, lubridate::as_datetime("3000-01-01 00:00:00")) ) %>%
  #      filter(receiving_water_course == "River Thames") %>%
  mutate(isOffline = ifelse( str_detect(alert_type.start, "Offline") |
                               str_detect(alert_type.stop, "Offline"),
                             "offline", "discharging")) %>%
  select(location_name, isOffline, contains("date_time")),
by = join_by(dt >= date_time.start, dt <= date_time.stop), suffix = c(".this", ".that")
  ) %>%
  # # I want to plot watercourse
  # left_join(
  #   edm_historic_df %>%
  #     count(location_name, receiving_water_course) %>%
  #     select(-n)
  # ) %>%
  # count(receiving_water_course, dt, isOffline) %>%
  # pivot_wider(id_cols = c(receiving_water_course, dt), names_from = isOffline, values_from = n, values_fill = 0) %>%
  count(dt, isOffline) %>%
  pivot_wider(id_cols = c(dt), names_from = isOffline, values_from = n, values_fill = 0) %>%
  mutate(total = offline + discharging) %>%
  mutate(offline_pcnt = offline/ max(total)) %>%
  mutate(discharging_pcnt = discharging/ max(discharging))

We can can plot this as a timeseries.

Code
event_by_day %>%
  select(dt, discharging, offline) %>%
  pivot_longer(c(discharging, offline), names_to = "edm_state", values_to = "count") %>%
  ggplot() +
  geom_line(aes(x = dt, y = count, colour = edm_state))  +
  labs(title = "The number of EDMs discharging each day",
       subtitle = "There was a surge in the number of EDMs discharging in January 2023", caption = "Data from Thames Water historic API", x = "date", y = "Number of EDMs discharging")

It’s only small leap now to imagine how we could compare / correlate the number of discharges each day with other timeseries such as rainfall, SMD, river levels etc.

Play!

The value of open-data is that it empowers exploration and innovation. If you have access to the data, then you can begin to create new insights. Here’s a plot of all events that have run for more than (arbitrarily) 3 days. I’ve grouped them by water course to explore which water courses appear to be most affected by discharges.

Code
#| simple EDA of the Thames EDM data-set ----
p <- edm_df %>%
  group_by(receiving_water_course) %>%
  filter(max(discharge_duration) > days(3)) %>%
  ggplot() +
  geom_boxplot(aes(fct_reorder(receiving_water_course, as.numeric(discharge_duration)),
                   discharge_duration,
                   fill = receiving_water_course)) +
  coord_flip() + 
  theme(legend.position="none") +
  labs(title = "Water Courses that have had events lasting more than 3 days",
       subtitle = paste0("Data collected: ", data_collection_dt),
       y = "discharge duration (days)",
       x = "water course name",
       caption  = "Data from Thames Water's open API: https://data.thameswater.co.uk/s/apis")

#| plotly is good for interactive ggplots:
plotly::ggplotly(p)

Note: Care must be taken when inferring anything from data-sets, especially unfamiliar ones. Some reasons why the plot might be misleading:

  • Are all watercourses equally instrumented? or could we be plotting those with measurements on outlets?

  • What about those instruments that are offline? could our view of the world be biased without considering those off-line, especially if (say) sensors are more likely to go off-line after exposure to discharged media.

  • Are all discharges equal? This data-set does not contain information on the volume or content of discharges, neither is there any information available from the Thames Water APIs about the size of the watercourse into which the waste is being discharged.

Where next?

It’s over to you to continue exploration of the data and ways to visualize, summarize and generate insight. Thames Water have taken a very progressive step by creating the API and making this data available. I hope this post has been interesting. I look forward to other people’s apps / posts / summaries of this valuable open data-set.

Personally, I will be continuing to explore this data and joining it to other open data-sets, but that is for another post…

Footnotes

  1. GET {api_root}/data/STE/v1/DischargeAlerts↩︎